Dataset: For this exercise we will use a dataset hosted at http://ai.stanford.edu/~amaas/data/sentiment/
Problem Statement: This is a dataset for binary sentiment classification containing substantially more data than previous benchmark datasets. We provide a set of 25,000 highly polar movie reviews for training, and 25,000 for testing. There is additional unlabeled data for use as well. Raw text and already processed bag of words formats are provided.
Launch a spark session, verify the spark session UI
In [1]:
spark.sparkContext.uiWebUrl
Out[1]:
IMDB comments dataset has been stored in the following location
In [2]:
!wc -l data/imdb-comments.json
There are 50000 lines in the file. Let's the first line
In [3]:
!du -sh data/imdb-comments.json
Total size of the file is 66MB
In [4]:
!head -n 1 data/imdb-comments.json
Each line is a self contained json doc. Load the dataset using spark reader specifying the file format as json. As we see above size of the file is 66 MB, we should at least 2 partitons, since I am using dual core system, I will repartition the data to 4. Also will cache the data after repartitioning.
In [5]:
imdb = spark.read.format("json").load("data/imdb-comments.json").repartition(4).cache()
Find total number of records
In [6]:
imdb.count()
Out[6]:
Print Schema and view the field types
In [7]:
imdb.printSchema()
Take a look at a few sample data
In [8]:
imdb.show()
label - column indicate whethet the data belong to training or test bucket. sentiment - column indicates whether the comment carries positive or negative sentiment. This column has been manually curated.
Find out for each combination of label and sentimnet how many records are there.
In [9]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
In [10]:
imdb.groupBy("sentiment").pivot("label").count().show()
Look at a sample comment value
In [11]:
content = imdb.sample(False, 0.001, 1).first().content
content
Out[11]:
Register a UDF function to clean the comment from the html tags. If BeautifulSoup is not installed, you can install it using pip
(shell command)
$ pip install BeautifulSoup4
In [12]:
from bs4 import BeautifulSoup
from pyspark.sql.types import *
import re
def remove_html_tags(text):
text = BeautifulSoup(text, "html5lib").text.lower() #removed html tags
text = re.sub("[\W]+", " ", text)
return text
spark.udf.register("remove_html_tags", remove_html_tags, StringType())
Test the remove_html_tags function
In [13]:
remove_html_tags(content)
Out[13]:
Apply the the udf on the imdb dataframe.
In [14]:
imdb_clean = imdb.withColumn("content", expr("remove_html_tags(content)")).cache()
imdb_clean.sample(False, 0.001, 1).first().content
Out[14]:
Use Tokenizer to split the string into terms. Then use StopWordsRemover to remove stop words like prepositions, apply CountVectorizer to find all distinct terms and found of each term per document.
In [15]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer, StopWordsRemover
In [16]:
tokenizer = Tokenizer(inputCol="content", outputCol="terms")
terms_data = tokenizer.transform(imdb_clean)
In [17]:
print(terms_data.sample(False, 0.001, 1).first().terms)
In [18]:
remover = StopWordsRemover(inputCol="terms", outputCol="filtered")
terms_stop_removed = remover.transform(terms_data)
In [19]:
print(terms_stop_removed.sample(False, 0.001, 1).first().filtered)
In [20]:
count_vectorizer = CountVectorizer(inputCol="filtered", outputCol="count_vectors")
count_vectorizer_model = count_vectorizer.fit(terms_stop_removed)
count_vectorized = count_vectorizer_model.transform(terms_stop_removed)
count_vectorized.sample(False, 0.001, 1).first().count_vectors
Out[20]:
count_vectorized Dataframe contains a column count_vectors that is a SparseVector representing which term appears and how many times. The key is the index of all unique terms. You can find list of terms count_vectorizer_model.vocabulary. See below.
In [21]:
print(count_vectorizer_model.vocabulary[:100], "\n\nTotal no of terms", len(count_vectorizer_model.vocabulary))
In [22]:
count_vectorized.show()
SparkVector represents a vector of 103999, that means in the dataset (corpus) there are 103999 unique terms. Per document, only a few will be present. Find density of each count_vectors.
In [23]:
vocab_len = len(count_vectorizer_model.vocabulary)
spark.udf.register("density", lambda r: r.numNonzeros() / vocab_len, DoubleType())
count_vectorized.select(expr("density(count_vectors) density")).show()
Density report shows, the count_vectors has very low density which illustrate the benefit of the choice of DenseVector for this column.
Now, calculate tfidf for the document.
In [24]:
idf = IDF(inputCol="count_vectors", outputCol="features")
idf_model = idf.fit(count_vectorized)
idf_data = idf_model.transform(count_vectorized)
In [25]:
idf_data.sample(False, 0.001, 1).first().features
Out[25]:
In [26]:
idf_data.printSchema()
Apply StringIndexer to conver the sentiment column from String type to number type - this is prerequisit to apply the LogisticRegression algorithm.
In [27]:
from pyspark.ml.feature import StringIndexer
In [28]:
string_indexer = StringIndexer(inputCol="sentiment", outputCol="sentiment_idx")
string_indexer_model = string_indexer.fit(idf_data)
label_encoded = string_indexer_model.transform(idf_data)
In [29]:
label_encoded.select("sentiment", "sentiment_idx").show()
Split the data into traininf and testing groups with 70/30 ratio. Cache the dataframe so that training runs faster.
In [30]:
training, testing = label_encoded.randomSplit(weights=[0.7, 0.3], seed=1)
training.cache()
testing.cache()
Out[30]:
Verify that the StringIndex has done the expected job and training and testing data maintain the ratio of positive and negative records as in the whole dataset.
In [31]:
training.groupBy("sentiment_idx", "sentiment").count().show()
In [32]:
testing.groupBy("sentiment_idx", "sentiment").count().show()
Apply LogisticRegression classifier
In [33]:
from pyspark.ml.classification import LogisticRegression
In [34]:
lr = LogisticRegression(maxIter=10000, regParam=0.1, elasticNetParam=0.0,
featuresCol="features", labelCol="sentiment_idx")
Show the parameters that the LogisticRegression classifier takes.
In [35]:
print(lr.explainParams())
In [36]:
lr_model = lr.fit(training)
In [37]:
lr_model.coefficients[:100]
Out[37]:
From the training summary find out the cost decay of the model.
In [38]:
training_summary = lr_model.summary
In [39]:
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline
In [40]:
pd.Series(training_summary.objectiveHistory).plot()
plt.xlabel("Iteration")
plt.ylabel("Cost")
Out[40]:
Find area under the curve. Closer to 1 is better
In [41]:
training_summary.areaUnderROC
Out[41]:
In [42]:
predictions = lr_model.transform(testing).withColumn("match", expr("prediction == sentiment_idx"))
In [43]:
predictions.select("prediction", "sentiment_idx", "sentiment", "match").sample(False, 0.01).show(10)
In [44]:
predictions.groupBy("sentiment_idx").pivot("prediction").count().show()
Find the accuracy of the prediction
In [45]:
accuracy = predictions.select(expr("sum(cast(match as int))")).first()[0] / predictions.count()
accuracy
Out[45]:
In [ ]: